package com.wuuxiang.i5xforyou.websocket;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wuuxiang.i5xforyou.websocket.rabbitmq.WebSocketRabbitMqBridge;
import com.wuuxiang.i5xforyou.websocket.rabbitmq.kanjia.KanjiaMsgHandler;
import com.wuuxiang.i5xforyou.websocket.util.SpringContext;

/** 
 * @ClassName: AbstractWebSocket <br/>
 * websocket的主类 <br/>
 * @author Mobile Web Group-lff
 * @date 2018年2月11日 上午9:53:11
 *
 */
@Controller
@ServerEndpoint("/ws/{biz}/{key}") 
public class WebSocket {
	private static Logger log = LoggerFactory.getLogger(WebSocket.class);
	private static ConcurrentMap <String, CopyOnWriteArrayList<WebSocket>> webSocketMap = new ConcurrentHashMap<String, CopyOnWriteArrayList<WebSocket>>();
	private Session session;
	private String address;//消息的地址:业务区分.key，例如duorendiancai.4991_123
	//WebSocket和RabbitMq的消息互通bridge
	private WebSocketRabbitMqBridge webSocketRabbitMqBridge = SpringContext.getBean(WebSocketRabbitMqBridge.class);
	
	
	@OnOpen
	public void onOpen(@PathParam("biz") String biz, @PathParam("key") String key, Session session) {
		//参数合法性check
		if (StringUtils.isBlank(biz) || StringUtils.isBlank(key)) {
			String msg = "websocket连接参数不合法，biz=" + biz + ",key=" + key;
			log.error(msg);
			try {
				session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, msg));
			} catch (IOException e) {
				log.error("关闭websocket连接异常:" + e.toString());
			}
		}
		
		//保存websocket连接
		this.address = biz + "." + key;
		this.session = session;
		
		//按address区分websocket的session连接
		CopyOnWriteArrayList<WebSocket> webSocketList = webSocketMap.get(address);
		if (webSocketList == null) {
			CopyOnWriteArrayList<WebSocket> tempList = new CopyOnWriteArrayList<WebSocket>();
			tempList.add(this);
			webSocketMap.put(address, tempList);
		} else {
			webSocketList.add(this);
		}
	} 
	
	@OnMessage
	public void onMessage(String message, Session session) {
		//message合法性check
		JSONObject msgJson = JSON.parseObject(message);
		if (msgJson != null) {
			msgJson.put(KanjiaMsgHandler.ADDRESS_KEY_MQ_MSG, this.address);
			//发送mq消息
			webSocketRabbitMqBridge.sendRabbitMqMsg(this.address, msgJson.toJSONString());
			log.info("server receive msg:address=" + this.address);
		} else {
			log.error("server receive msg=" + message + "，address=" + this.address);
			return;
		}
	}
	
	@OnClose
	public void onClose(Session session, CloseReason closeReason) {
		CopyOnWriteArrayList<WebSocket> webSocketList = webSocketMap.get(this.address);
		if (webSocketList != null) {
			webSocketList.remove(this);
			if (webSocketList.isEmpty()) {
				webSocketMap.remove(this.address);
			}
		}
		log.info("onClose: address=" + this.address + ",id=" + session.getId() + ",reason=" + closeReason.getReasonPhrase() );
	}

	//连接错误时执行
	@OnError
	public void onError(Throwable t) {
		log.error("websocket onError:" + t.toString());
	}
	
	
	/**
	 * publish <br/>
	 * 广播消息 <br/>
	 *
	 * @author Mobile Web Group-lff
	 * @date 2018年2月11日 上午10:12:27
	 *
	 * @param address
	 * @param message
	 * @return void
	 */
	public static void publish(String address, String message){
		CopyOnWriteArrayList<WebSocket> webSocketList = webSocketMap.get(address);
		if (webSocketList == null) {
			return;
		}
		for (WebSocket webSocket : webSocketList) {
			try {
				//发送消息
				webSocket.session.getBasicRemote().sendText(message);
			} catch (IOException e) {
				//输出log，继续下一个webSocket的msg发送
				log.error("发送消息失败：原因=" + e.toString() + ",msg=" + message);
			}
		}
	}
	
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((this.session == null) ? 0 : this.session.getId().hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj == null) {
			return false;
		}
		if (!(obj instanceof WebSocket)) {
			return false;
		}
		WebSocket other = (WebSocket) obj;
		if (this.session == null) {
			if (other.session != null) {
				return false;
			}
		} else if (!this.session.getId().equals(other.session.getId())) {
			return false;
		}
		return true;
	}

}
